



import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

class Projection {
    public static void main(String[] args) throws Exception {
//    StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();

        ExecutionEnvironment senv = ExecutionEnvironment.getExecutionEnvironment();


        System.out.println("---------------------------------");
        String rootPath = new StringBuilder(System.getProperty("user.dir")).append("/dataset_api/Java").toString();
        System.out.println(rootPath);





        DataSet<Tuple3<Integer,Double,String>> in = senv.readCsvFile("file:///"+rootPath+"/"+"projection.csv").types(Integer.class, Double.class, String.class);


        in.print();
        System.out.println("-----------------------Projection of Tuple DataSet-------------------------------------------");
        // converts Tuple3<Integer, Double, String> into Tuple2<String, Integer>
        DataSet<Tuple2<String, Integer>> out = in.project(2, 0);
        out.print();



        System.out.println("----------------------Projection with Type Hint------------------------------------------");

        DataSet<Tuple5<String,String,String,String,String>> ds = senv.readCsvFile("file:///"+rootPath+"/"+"hint.csv").types(String.class, String.class ,String.class ,String.class,String.class);
        System.out.println("-------------------------------");

        DataSet<Tuple1<String>> ds2 = ds.<Tuple1<String>>project(0).distinct(0);

        ds2.print();



    }

}
